package com.zhang.gmall.realtime.app.function;

import com.alibaba.fastjson.JSONObject;
import com.zhang.gmall.realtime.common.GmallConfig;
import com.zhang.gmall.realtime.utils.DimUtil;
import org.apache.commons.lang.StringUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Set;

/**
 * @title:
 * @author: zhang
 * @date: 2022/1/1 23:44
 */
public class DimSinkFunction extends RichSinkFunction<JSONObject> {
    private Connection connection;
    //连接Hbase
    @Override
    public void open(Configuration parameters) throws Exception {
        Class.forName(GmallConfig.PHOENIX_DRIVER);
        connection = DriverManager.getConnection(GmallConfig.PHOENIX_SERVER);
        connection.setAutoCommit(true);
    }
    //value:{"sinkTable":"dim_base_trademark","database":"gmall-210325-flink","before":{"tm_name":"atguigu","id":12},"after":{"tm_name":"zhang","id":12},"type":"update","tableName":"base_trademark"}
    //SQL：upsert into db.tn(id,tm_name) values('...','...')
    @Override
    public void invoke(JSONObject value, Context context)  {
        PreparedStatement preparedStatement = null;
        try {
            //获取SQL语句
            String sinkTable = value.getString("sinkTable");
            JSONObject after = value.getJSONObject("after");
            String upsertSql = genUpsertSql(sinkTable, after);
            //打印sql
            System.out.println(upsertSql);
            //预编译sql
            preparedStatement = connection.prepareStatement(upsertSql);

            //判断如果当前数据为更新操作，则先删除Redis中的数据
            if ("update".equals(value.getString("type"))){
                DimUtil.delRedisDimInfo(sinkTable.toUpperCase(),after.getString("id"));
            }


            //执行插入操作
            preparedStatement.executeUpdate();
        } catch (SQLException e) {
            e.printStackTrace();
        } finally {
            if (preparedStatement!=null){
                try {
                    preparedStatement.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    private String genUpsertSql(String sinkTable, JSONObject data) {

        Set<String> keySet = data.keySet();
        Collection<Object> values = data.values();

        return "upsert into "+GmallConfig.HBASE_SCHEMA+"."+sinkTable+"("+
                StringUtils.join(keySet,",")+") values('"+
                StringUtils.join(values,"','")+"')";
    }
}
